flatten ModifierFlatten: Remove the level of the data that are in the types bag or a tuple. For bags, flatten produces a cross product of every item in the bag with all of the other expressions in the generate clause.
players = load '/baseball.txt' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]);
pos = foreach players generate name, flatten(position) as position;
limit10 = limit pos 10;
dump limit10;
(Jorge Posada,Catcher)
(Jorge Posada,Designated_hitter)
(Landon Powell,Catcher)
(Landon Powell,First_baseman)
(Martin Prado,Second_baseman)
(Martin Prado,Infielder)
(Martin Prado,Left_fielder)
(David Price,Starting_pitcher)
(David Price,Pitcher)
(Jason Pridie,Outfielder)
Group by position after the flatten:
players = load '/baseball.txt' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]);
pos = foreach players generate name, flatten(position) as position;
pstn_group = group pos by position;
dump pstn_group;
foreachA nested foreach (or inner foreach): applying a set of relational operations to each record in the data pipeline in a foreach statement. For example, find the number of unique entries in a group:
daily = load '/NYSE_daily.txt' as (exchange, symbol);
grpd = group daily by exchange;
uniqcnt = foreach grpd {
sym = daily.symbol;
uniq_sym = distinct sym;
generate group, COUNT(uniq_sym);
};
dump uniqcnt;
We can modify the code slightly to find the number of entries of each stock symbol.
daily = load '/NYSE_daily.txt' as (exchange, symbol);
grpd = group daily by symbol;
symbolcnt = foreach grpd {
sym = daily.symbol;
generate group, COUNT(sym);
};
dump symbolcnt;
Find the top 5 dividends of each stock symbol in a group.
divs = load '/NYSE_dividends.txt' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);
grpd = group divs by symbol;
top5 = foreach grpd {
sorted = order divs by dividends desc;
tops = limit sorted 5;
generate group, tops.dividends;
};
dump top5;
JOIN ImplementationsWhen joining a small file with a big file, it makes sense to send the small file to every machine (node), load it into memory, and then do the join by streaming through the large file and looking up each record in the small file. This is called a fragment-replicate join (because you fragment one file and replicate the other). It supports only inner and left outer joins.
daily = load '/NYSE_daily.txt' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);
divs = load '/NYSE_dividends.txt' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);
jnd = join daily by (exchange, symbol), divs by (exchange, symbol) using 'replicated';The using 'replicated' keyword tells Pig to use the fragment-replicate algorithm. The 2nd input listed in the join is always the input that is loaded into memory. If Pig cannot fit the replicated input into memory, it will issue an error and fail.
Sometimes the data you will be processing with Pig has significant skew in the number of records per key. Pig’s default join algorithm is very sensitive to skew, because it collects all of the records for a given key together on a single reducer. In some datasets, there are a few keys that have far more records than other keys. This results in a few reducers that will take much longer than the rest. To deal with this, Pig provides skew join.
users = load 'users' as (name:chararray, city:chararray);
cinfo = load 'cityinfo' as (city:chararray, population:int);
jnd = join cinfo by city, users by city using 'skewed';The second input in the join, in this case users, is the one that will be sampled and have its keys with a large number of values split across reducers. The first input will have records with those values replicated across reducers.
If your inputs are already sorted on the join key, the join can be done efficiently in the map phase by opening both files and walking through them. Pig refers to this as a merge join. This can be done without a reduce phase, and so it is more efficient than a default join. For example:
daily = load '/NYSE_daily.txt' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);
srtd = order daily by symbol;
divs = load '/NYSE_dividends.txt' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);
dsrtd = order divs by symbol;
jnd = join srtd by symbol, dsrtd by symbol using 'merge';
dump jnd;The result:
CogroupThe operation cogroup is a generalization of group. Instead of collecting records of one input, it collects records of n inputs based on a key. The result is a record with a key and one bag for each input. Each bag contains all records from that input that have the given value for the key. All records with a null value in the key will be collected together.
daily = load '/NYSE_daily.txt' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);
divs = load '/NYSE_dividends.txt' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);
grpd = cogroup daily by (exchange, symbol), divs by (exchange, symbol);
dump grpd;The result:
cogroup with foreachWhen cogroup is used with foreach, where each bag is flattened, it is equivalent to a join - as long as there are no null values in the keys.
daily = load '/NYSE_daily.txt' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);
divs = load '/NYSE_dividends.txt' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);
grpd = cogroup daily by (exchange, symbol), divs by (exchange, symbol);
sjnd = filter grpd by not IsEmpty(divs);
final = foreach sjnd generate flatten(daily);
dump final;The result:
UnionBoth share the same schema -> that schema
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:float);
C = union A, B;
describe C;
C: {x: int, y: float}One schema can be produced from another by implicit casts -> that resulting schema.
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:double);
C = union A, B;
describe C;
C: {x: int, y: double}Otherwise -> no schema (i.e., different records have different fields).
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:chararray);
C = union A, B;
describe C;
Schema for C unknown
Note: The schema comparison includes names, so different field names will result in the output having no schema.
The cross operation takes every record in one relation and combines it with every record in another relation and will produce output with \(n\times m\) records. Pig does implement cross in a parallel fashion.
daily = load '/NYSE_daily.txt' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);
divs = load '/NYSE_dividends.txt' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float);
crosseddata = cross daily, divs parallel 10;
toshow = limit crosseddata 10;
dump toshow;
(Note: If not limited, this code takes a LONG time to run locally due to the \(n\times m\) records to create and output.)The result:
Parameter substitution provides a capability with a basic stringreplacement functionality in Pig Latin. The following script is saved in a file named dly.pig (in HDFS).
daily = load '/NYSE_daily.txt' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float);
yesterday = filter daily by date == '$DATE';
grpd = group yesterday all;
minmax = foreach grpd generate MAX(yesterday.high), MIN(yesterday.low);
dump minmax;When you run dly.pig, you need to provide a definition for the parameter DATE as follows:
pig -param DATE=2009-12-30 dly.pigThe result:
$ (dollar sign).The result
Result for 2009-02
Result for 2009-08